package workerpool

import (
	"fmt"
	"sync"
	"time"
)

type Pool struct {
	Tasks   []*Task   // 所有待处理的任务
	Workers []*Worker // Workers 保存所有的 worker

	concurrency   int            // 生成与 concurrency 数量一致的 goroutine
	collector     chan *Task     // workers 之间共享缓存 channel -- collector
	runBackground chan bool      // runBackground 用于维持 pool 存活状态。
	wg            sync.WaitGroup // 实现协程之间的同步
}

func NewPool(tasks []*Task, concurrency int) *Pool {
	return &Pool{
		Tasks:       tasks,
		concurrency: concurrency,
		collector:   make(chan *Task, 1000),
	}
}

/**
AddTask() 方法用于往 collector 添加任务
 */
func (p *Pool) AddTask(task *Task) {
	p.collector <- task
}

/**
RunBackground() 方法衍生出一个无限运行的 goroutine，以便 pool 维持存活状态，
因为 runBackground 信道是空，读取空的 channel 会阻塞，所以 pool 能维持运行状态。
 */
func (p *Pool) RunBackground() {
	go func() {
		for {
			fmt.Println("⌛ Waiting for tasks to come in ...\n")
			time.Sleep(10 * time.Second)
		}
	}()

	for i := 1; i <= p.concurrency; i++ {
		worker := NewWorker(p.collector, i)
		p.Workers = append(p.Workers, worker)
		go worker.StartBackgroud()
	}

	for i := range p.Tasks {
		p.collector <- p.Tasks[i]
	}

	p.runBackground = make(chan bool)
	<-p.runBackground
}

// 并且给 runBackground 发送停止信号以便结束 RunBackground() 方法。
func (p *Pool) Stop() {
	for i := range p.Workers {
		p.Workers[i].Stop()
	}
	p.runBackground <- true
}

func (p *Pool) Run() {
	for i := 1; i <= p.concurrency; i++ {
		worker := NewWorker(p.collector, i)
		worker.Start(&p.wg)
	}

	for i := range p.Tasks {
		p.collector <- p.Tasks[i]
	}
	close(p.collector)

	p.wg.Wait()
}
